package wsz.rpc.zk;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.data.Stat;
import wsz.rpc.common.ConstantsUtils;
import wsz.rpc.consumer.client.RpcClient;

import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

/**
 * @author wsz
 * @date 2021/12/8 9:43
 **/
public class ZkClient {
    private CuratorFramework client;

    /**
     * 负载均衡
     * @return
     */
    public synchronized RpcClient randomClient() {
        if (clientList.isEmpty()) {
            throw new RuntimeException("cannot find RpcClient");
        }
        System.out.println("------ 所有rpc连接信息(选择handle最小的负载均衡) -------");
        clientList.forEach(c -> {
            System.out.print(c.getConnect() + "__" + c.getTime() + "    ");
        });
        System.out.println();
        return clientList.get(0);
    }

    Map<String, JSONObject> timeCahe = new HashMap<>();
    public void handleTime(RpcClient rpcClient, long start, long end) {
        long now = System.currentTimeMillis();
        long sub = end - start;
        String connect = rpcClient.getConnect();

        JSONObject cahce = timeCahe.containsKey(connect) ? timeCahe.get(connect) : new JSONObject();
        cahce.put(ConstantsUtils.NODE_DATA_HANDLE, sub);
        cahce.put(ConstantsUtils.NODE_DATA_TIME, now);
        timeCahe.put(connect, cahce);
    }
    private ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(20);
    class ZkTask implements Runnable{
        @Override
        public void run() {
            if (timeCahe.isEmpty()) return;
            try {
                for (Map.Entry<String ,JSONObject> entry : timeCahe.entrySet()) {
                    String path = ConstantsUtils.PARENT_NODE + "/" + entry.getKey();
                    // 临时节点的内容
                    JSONObject data = entry.getValue();
                    if (path == null || "".equals(path) || data == null || data.isEmpty()) continue;

                    // TODO 判断请求是否超时，超时则进行节点下线
                    Long handle = (Long) data.get(ConstantsUtils.NODE_DATA_HANDLE);
                    if (handle > 5000) {
                        System.out.println("请求超时：" + path + "_" + handle);
                        client.setData().forPath(path, JSON.toJSONString(new JSONObject()).getBytes());
                    } else {
                        client.setData().forPath(path, JSON.toJSONString(data).getBytes());
                    }
                }
                // TODO 每次刷新
                timeCahe.clear();
            } catch (Exception ex) {
                ex.printStackTrace();
            }
        }
    }

    // netty连接对象
    private List<RpcClient> clientList = new ArrayList<>();
    private Map<String, RpcClient> clientCache = new ConcurrentHashMap<>();

    public ZkClient(String zkServerUrl) {
        try {
            client = CuratorFrameworkFactory.builder()
                    .connectString(zkServerUrl)
                    .sessionTimeoutMs(50000)
                    .connectionTimeoutMs(3000)
                    .retryPolicy(new ExponentialBackoffRetry(1000, 3))
                    .build();

            client.start();

            // 获取初始化的节点信息
            initRpcClients();
            // 监听子节点的变化：add、delete
            bindListener();
            // 开启定时任务，定时发送每个netty的请求耗时
            scheduledThreadPool.scheduleWithFixedDelay(new ZkTask(), 1,5, TimeUnit.SECONDS);
        } catch (Exception ex) {
            ex.printStackTrace();
        }
    }

    /**
     * 服务发现
     * @throws Exception
     */
    private void initRpcClients() throws Exception {
        Stat stat = client.checkExists().forPath(ConstantsUtils.PARENT_NODE);
        if (stat == null) {
            System.out.println("cannot find path:" + ConstantsUtils.PARENT_NODE);
            return;
        }

        List<String> childrens = client.getChildren().forPath(ConstantsUtils.PARENT_NODE);

        for (String pathName : childrens) {
            // TODO 直接使用节点名称
            String[] host = pathName.split(":");

            RpcClient rpcClient = new RpcClient(host[0], Integer.valueOf(host[1]));
            clientList.add(rpcClient);
            clientCache.put(pathName, rpcClient);
        }
    }

    /**
     * 服务监听
     * @throws Exception
     */
    private void bindListener() throws Exception {
        // 监听子节点的变化：add、delete
        PathChildrenCache childrenCache = new PathChildrenCache(client, ConstantsUtils.PARENT_NODE, true);
        childrenCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
        childrenCache.getListenable().addListener((client, event) -> {
            ChildData childData = event.getData();

            String[] split = childData.getPath().split("/");
            String pathName = split[split.length - 1];

            String[] host = pathName.split(":");

            if (event.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED) {
                RpcClient rpcClient = new RpcClient(host[0], Integer.valueOf(host[1]));
                clientList.add(rpcClient);
                clientCache.put(pathName, rpcClient);
                System.out.println("client listener: server connect " + pathName);
            } else if (event.getType() == PathChildrenCacheEvent.Type.CHILD_REMOVED) {
                RpcClient rpcClient = clientCache.get(pathName);
                clientList.remove(rpcClient);
                clientCache.remove(rpcClient);
                System.out.println("client listener: server disconnect " + pathName);
            } else if (event.getType() == PathChildrenCacheEvent.Type.CHILD_UPDATED) {
                System.out.println("client listener: server update " + pathName);
                String fullPath = ConstantsUtils.PARENT_NODE + "/" + pathName;
                try {
                    byte[] bytes = client.getData().forPath(fullPath);
                    JSONObject nodeData =  JSON.parseObject(new String(bytes));
                    RpcClient rpcClient = clientCache.get(pathName);
                    clientList.remove(rpcClient);
                    clientCache.remove(rpcClient);

                    if (nodeData != null && !nodeData.isEmpty() && nodeData.containsKey(ConstantsUtils.NODE_DATA_HANDLE)) {
                        rpcClient.setTime(Long.parseLong(nodeData.get(ConstantsUtils.NODE_DATA_HANDLE).toString()));
                        clientList.add(rpcClient);
                        clientCache.put(pathName, rpcClient);
                    } else {
                        System.out.println("节点数据为空，下线："  + pathName);
                    }
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
            // 根据耗时排序
            Collections.sort(clientList, (o1, o2) -> (int) (o1.getTime() - o2.getTime()));
        });
    }
}
